package com.app.framework.socket.sender.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

import com.app.framework.socket.Packet;


public class SenderQueue {

	private static final Logger logger = Logger.getLogger(SenderQueue.class);
	
	private AtomicInteger size = new AtomicInteger();
	
	private ConcurrentLinkedQueue<Packet> queue = new ConcurrentLinkedQueue<Packet>();
	
	private SendQueueable sendQueueable;
	
	private ArrayBlockingQueue<SendRannable> rannables = new ArrayBlockingQueue<SendRannable>(9);
	
	private final static int PACKET_MAX = 20;
	
	public SenderQueue(SendQueueable sendQueueable) {
		this.sendQueueable = sendQueueable;
		Thread t = new Thread(new SendRannable());
		t.setDaemon(true);
		t.setName("SenderQueueThread-Master");
		t.setPriority(Thread.MAX_PRIORITY);
		t.start();
	}
	
	public Packet poll() {
		Packet p = queue.poll();
		if(p != null) {
			int tmp = size.decrementAndGet();
			if(tmp < 0) {
				logger.error("SendQueue size < 0"+tmp);
			}
			else if(tmp / 1000 < rannables.size()) {
				//移除线程
				SendRannable sr = rannables.poll();
				if(sr != null) {
					sr.setStop();
				}
			}
		}
		return p;
	}
	
	public void add(Packet packet) {
		queue.add(packet);
		if(size.incrementAndGet() / 1000 > rannables.size()) {
			//起新线程
			SendRannable sr = new SendRannable();
			if(rannables.offer(sr)) {
				Thread t = new Thread(sr);
				t.setDaemon(true);
				t.setName("SenderQueueThread-Slave");
				t.setPriority(Thread.MAX_PRIORITY);
				t.start();
			}
		}
		
	}
	
	public int size() {
		return size.get();
	}
	
	public void clear() {
		queue.clear();
	}
	
	class SendRannable implements Runnable {
	    private List<Packet> lst = new ArrayList<Packet>();
	    private volatile boolean stop;
		@Override
		public void run() {
			while (!stop && sendQueueable.isChannelActive()) {
				try {
					if(!sendQueueable.getChannel().isWritable()) {//缓冲区已满
						logger.warn("server2server socket buffer full.queue size:"+size());
						Thread.sleep(10);
						continue;
					}
					Packet p = poll();
					if (p != null) {
						lst.add(p);
						if (lst.size() > PACKET_MAX) {
							send();
//							System.err.println("reach max length send:"+size.get());
						}
					} else {
						send();
						//FIXME 下面的代码要删除
						if(lst.size() > 0)
						System.err.println("packet empty send:"+size.get());
//						System.err.println("packet empty ");
						Thread.sleep(100);
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		
		private void send() {
			if(lst.size() > 0) {
				sendQueueable.getChannel().write(lst);
				lst.clear();
			}
		}
		
		public void setStop() {
			stop = true;
		}
	}
}
